Published on

SeaTunnel 安装配置与实时数据处理实战指南

Authors
  • avatar
    Name
    Liant
    Twitter

Seatunnel 安装及使用

官方介绍: 下一代高性能、分布式、海量数据集成框架

介绍

本文基于最新的 2.3.0 版本作为实践基础.

官方文档 由于项目还是早期,所以资料很少.

结构图

SeaTunnel

FlinkSpark作为设计思想,从source中获取数据,经过transform,最终写入到sink中.

坑点

  1. 使用官方的推荐版本,Oracle JDKFLink 1.3
  2. Flink作为执行引擎较为稳定.以SeaTunnel本身的引擎作为运行引擎问题较多
  3. 2.3版本不稳定

安装

1. Java环境

Java 版本号至少是大于8的版本,官方使用Oracle JDK.不要使用OpenJDK.

具体安装见于网络,资料较多.

下载 flink-1.13.6 程序

启动 flink

# 保持运行
./bin/start-cluster.sh

访问localhost:8081,查看Flink的WebUI

3. 安装Seatunnel

下载程序 Seatunnel 2.3.0

解压到指定目录

4. 安装连接器插件

安装Seatunnel的插件

bash bin/install_plugin.sh

config/plugin_config 文件中可以调整需要的连接器.

--connectors-v2--
connector-cdc-mysql
connector-dingtalk
connector-elasticsearch
connector-http-wechat
connector-jdbc
connector-kafka
connector-mongodb
connector-pulsar
connector-rabbitmq
connector-redis
connector-socket
--end--

由于需要使用到maven,最好调整一下maven的repo的仓库地址.可以加速下载连接器

# 查看maven版本以及其他信息
./mvnw -version
# 进入 moven_home 目录,修改配置文件 conf/settings.xml 中的仓库地址
<mirror>
    <id>alimaven</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <mirrorOf>central</mirrorOf>
</mirror>

5. 配置

编辑文件 config/seatunnel-env.sh

# 需要指向flink的安装目录
FLINK_HOME=/usr/opt/flink

使用

编辑任务

编写一个任务配置,该任务模式是STREAMING,这意味着不会停止.

env {
    execution.planner=blink
	execution.parallelism = 1
	job.mode = "STREAMING"
}

source {
    Socket{ # 从Socket中读取数据.监听本机,端口9999
    	host = "localhost"
        port = 9999
        result_table_name = "fake" # 数据集命名为fake
        field_name = "info" # 字段命名为info
    }
}

transform {
	Split{ # 切割字段
		separator = "#"
		fields = ["name","age"]
	}
	sql { # 使用flink的SQL处理字段
		sql = "select info, split(info) as info_row from fake"
	}
}

sink {
  Console {  # 输出到控制台,如果是在Flink下运行任务,数据会作为标准输出,记录在Flink服务器日志中
  }
}

将任务提交到Flink中

# 先打开本机的9999端口
nc -lk 9999
# 提交任务到SeaTunnel中
./bin/start-seatunnel-flink-connector-v2.sh -c config/flink1111.config

在 nc 所在终端继续输入

name01#10
name02#20

那么在Flink的webUI中即可看到任务标准输出

fields : value
types : STRING
fields : value
types : STRING
subtaskIndex=0: row=1 : name01#10
subtaskIndex=0: row=1 : name02#20
结果类似于: output

transform 目前支持好像有问题, 定义的 split 函数转换无效

参考文档

  1. Seatunnel document